权限控制概述#
Skills 的权限控制是确保系统安全和数据保护的重要机制。本节将详细介绍 Skills 的权限模型、权限类型和权限管理方法。
权限模型#
1. 基于角色的访问控制(RBAC)#
1.1 角色定义
角色类型#
管理员角色#
- 完全访问权限
- 可以创建、修改、删除 Skills
- 可以管理用户和角色
开发者角色#
- 可以创建和修改 Skills
- 可以执行 Skills
- 不能删除其他用户的 Skills
用户角色#
- 只能执行 Skills
- 不能创建或修改 Skills
- 访问受限的资源和功能
访客角色#
- 只读权限
- 只能查看 Skill 文档
- 不能执行任何操作
1.2 权限定义
pythonclass Permission: def __init__(self, name, description): self.name = name self.description = description # Skill 权限 SKILL_CREATE = Permission("skill:create", "创建 Skill") SKILL_READ = Permission("skill:read", "读取 Skill") SKILL_UPDATE = Permission("skill:update", "更新 Skill") SKILL_DELETE = Permission("skill:delete", "删除 Skill") SKILL_EXECUTE = Permission("skill:execute", "执行 Skill") # 资源权限 RESOURCE_READ = Permission("resource:read", "读取资源") RESOURCE_WRITE = Permission("resource:write", "写入资源") RESOURCE_DELETE = Permission("resource:delete", "删除资源") # 系统权限 SYSTEM_CONFIG = Permission("system:config", "配置系统") SYSTEM_MONITOR = Permission("system:monitor", "监控系统") #### 1.3 角色权限映射 ```python class Role: def __init__(self, name, permissions): self.name = name self.permissions = permissions # 定义角色 ADMIN = Role("admin", [ SKILL_CREATE, SKILL_READ, SKILL_UPDATE, SKILL_DELETE, SKILL_EXECUTE, RESOURCE_READ, RESOURCE_WRITE, RESOURCE_DELETE, SYSTEM_CONFIG, SYSTEM_MONITOR ]) DEVELOPER = Role("developer", [ SKILL_CREATE, SKILL_READ, SKILL_UPDATE, SKILL_EXECUTE, RESOURCE_READ, RESOURCE_WRITE ]) USER = Role("user", [ SKILL_READ, SKILL_EXECUTE, RESOURCE_READ ]) GUEST = Role("guest", [ SKILL_READ ]) ~~~ ### 2. 基于属性的访问控制(ABAC) #### 2.1 属性定义 ```python class Attribute: def __init__(self, name, value): self.name = name self.value = value # 用户属性 class UserAttributes: def __init__(self, user_id, role, department, level): self.user_id = Attribute("user_id", user_id) self.role = Attribute("role", role) self.department = Attribute("department", department) self.level = Attribute("level", level) # 资源属性 class ResourceAttributes: def __init__(self, resource_id, owner, sensitivity, classification): self.resource_id = Attribute("resource_id", resource_id) self.owner = Attribute("owner", owner) self.sensitivity = Attribute("sensitivity", sensitivity) self.classification = Attribute("classification", classification) # 环境属性 class EnvironmentAttributes: def __init__(self, time, location, device): self.time = Attribute("time", time) self.location = Attribute("location", location) self.device = Attribute("device", device) #### 2.2 策略定义 ```python class AccessPolicy: def __init__(self, name, condition, effect): self.name = name self.condition = condition self.effect = effect # 示例策略 POLICIES = [ AccessPolicy( name="admin_full_access", condition=lambda user, resource, env: user.role.value == "admin", effect="allow" ), AccessPolicy( name="owner_access", condition=lambda user, resource, env: user.user_id.value == resource.owner.value, effect="allow" ), AccessPolicy( name="sensitivity_restriction", condition=lambda user, resource, env: resource.sensitivity.value == "high" and user.level.value < 3, effect="deny" ), AccessPolicy( name="business_hours_only", condition=lambda user, resource, env: not (9 <= env.time.value.hour < 17), effect="deny" ) ] ~~~ ### 3. 权限检查 #### 3.1 RBAC 检查 ```python class RBACChecker: def __init__(self): self.user_roles = {} self.role_permissions = {} def assign_role(self, user_id, role): if user_id not in self.user_roles: self.user_roles[user_id] = [] self.user_roles[user_id].append(role) def check_permission(self, user_id, permission): # 获取用户角色 roles = self.user_roles.get(user_id, []) # 检查每个角色是否有该权限 for role in roles: if permission in role.permissions: return True return False def check_skill_permission(self, user_id, skill_id, action): permission = Permission(f"skill:{action}", "") return self.check_permission(user_id, permission) #### 3.2 ABAC 检查 class ABACChecker: def __init__(self): self.policies = [] def add_policy(self, policy): self.policies.append(policy) def check_access(self, user, resource, environment): for policy in self.policies: if policy.condition(user, resource, environment): return policy.effect == "allow" return False def check_skill_access(self, user, skill, environment): return self.check_access(user, skill, environment) ~~~ ## 资源访问控制 ### 1. 文件系统访问 #### 1.1 文件权限 ```python class FilePermission: def __init__(self, path, read, write, execute): self.path = path self.read = read self.write = write self.execute = execute class FileAccessController: def __init__(self): self.permissions = {} def set_permission(self, path, read=False, write=False, execute=False): self.permissions[path] = FilePermission(path, read, write, execute) def check_read(self, user_id, path): permission = self.permissions.get(path) if not permission: return False # 检查用户是否有读权限 if permission.read: return True # 检查用户是否是文件所有者 if self.is_owner(user_id, path): return True return False def check_write(self, user_id, path): permission = self.permissions.get(path) if not permission: return False if permission.write: return True if self.is_owner(user_id, path): return True return False def is_owner(self, user_id, path): # 检查用户是否是文件所有者 pass
1.2 目录访问
pythonclass DirectoryAccessController: def __init__(self): self.access_rules = {} def set_access_rule(self, directory, allowed_users, allowed_roles): self.access_rules[directory] = { "users": allowed_users, "roles": allowed_roles } def check_access(self, user_id, user_roles, directory): rule = self.access_rules.get(directory) if not rule: return False if user_id in rule["users"]: return True for role in user_roles: if role in rule["roles"]: return True
bashreturn False
bash### 2. API 访问控制 #### 2.1 API 权限 ```python class APIPermission: def __init__(self, endpoint, method, required_permissions): self.endpoint = endpoint self.method = method self.required_permissions = required_permissions class APIAccessController: def __init__(self): self.api_permissions = {} def register_endpoint(self, endpoint, method, required_permissions): key = f"{method}:{endpoint}" self.api_permissions[key] = APIPermission( endpoint, method, required_permissions ) def check_access(self, user_id, endpoint, method): key = f"{method}:{endpoint}" permission = self.api_permissions.get(key) if not permission: return False # 检查用户是否有所有必需的权限 for required_perm in permission.required_permissions: if not self.has_permission(user_id, required_perm): return False return True def has_permission(self, user_id, permission): # 检查用户是否有特定权限 pass
2.2 速率限制
pythonclass RateLimiter: def __init__(self): self.requests = defaultdict(list) def check_rate_limit(self, user_id, endpoint, limit, window): now = datetime.now() window_start = now - timedelta(seconds=window) user_requests = self.requests[user_id] recent_requests = [ req for req in user_requests if req["timestamp"] >= window_start and req["endpoint"] == endpoint ] if len(recent_requests) >= limit: return False user_requests.append({ "endpoint": endpoint, "timestamp": now }) return True
3. 数据访问控制#
3.1 数据库访问
pythonclass DatabaseAccessController: def __init__(self): self.table_permissions = {} def set_table_permission(self, table, permissions): self.table_permissions[table] = permissions def check_select(self, user_id, table): permission = self.table_permissions.get(table) if not permission: return False return permission.get("select", False) def check_insert(self, user_id, table): permission = self.table_permissions.get(table) if not permission: return False return permission.get("insert", False) def check_update(self, user_id, table): permission = self.table_permissions.get(table) if not permission: return False return permission.get("update", False) def check_delete(self, user_id, table): permission = self.table_permissions.get(table) if not permission: return False return permission.get("delete", False) #### 3.2 字段级访问 class FieldAccessController: def __init__(self): self.field_permissions = {} def set_field_permission(self, table, field, allowed_roles): key = f"{table}.{field}" self.field_permissions[key] = allowed_roles def check_field_access(self, user_roles, table, field): key = f"{table}.{field}" allowed_roles = self.field_permissions.get(key, []) for role in user_roles: if role in allowed_roles: return True return False def filter_fields(self, user_roles, table, fields): accessible_fields = [] for field in fields: if self.check_field_access(user_roles, table, field): accessible_fields.append(field) return accessible_fields
Skill 权限管理#
1. Skill 创建权限#
bashpython class SkillCreationManager: def __init__(self, permission_checker): self.permission_checker = permission_checker def can_create_skill(self, user_id, skill_definition): # 检查用户是否有创建 Skill 的权限 if not self.permission_checker.check_permission(user_id, SKILL_CREATE): return False # 检查 Skill 定义是否符合规范 if not self.validate_skill_definition(skill_definition): return False # 检查资源访问权限 if not self.check_resource_access(user_id, skill_definition): return False return True def validate_skill_definition(self, definition): # 验证 Skill 定义 required_fields = ["name", "version", "description"] for field in required_fields: if field not in definition: return False return True def check_resource_access(self, user_id, definition): # 检查 Skill 需要访问的资源 resources = definition.get("resources", []) for resource in resources: if not self.permission_checker.check_resource_access(user_id, resource): return False return True ``` ### 2. Skill 执行权限 ```python class SkillExecutionManager: def __init__(self, permission_checker): self.permission_checker = permission_checker def can_execute_skill(self, user_id, skill_id, parameters): if not self.permission_checker.check_permission(user_id, SKILL_EXECUTE): return False skill = self.get_skill(skill_id) if not skill: return False if not self.check_skill_specific_permission(user_id, skill): return False if not self.check_parameter_permissions(user_id, skill, parameters): return False return True def check_skill_specific_permission(self, user_id, skill): required_permissions = skill.get_required_permissions() for permission in required_permissions: if not self.permission_checker.check_permission(user_id, permission): return False return True def check_parameter_permissions(self, user_id, skill, parameters): for key, value in parameters.items(): if self.is_resource_reference(value): if not self.permission_checker.check_resource_access(user_id, value): return False return True def is_resource_reference(self, value): return isinstance(value, str) and value.startswith("resource://")
3. Skill 修改权限#
bashpython class SkillModificationManager: def __init__(self, permission_checker): self.permission_checker = permission_checker def can_modify_skill(self, user_id, skill_id): # 检查用户是否有修改 Skill 的权限 if not self.permission_checker.check_permission(user_id, SKILL_UPDATE): return False # 检查 Skill 是否存在 skill = self.get_skill(skill_id) if not skill: return False # 检查用户是否是 Skill 所有者 if not self.is_skill_owner(user_id, skill): return False return True def is_skill_owner(self, user_id, skill): return skill.owner_id == user_id ``` ### 4. Skill 删除权限 ```python class SkillDeletionManager: def __init__(self, permission_checker): self.permission_checker = permission_checker def can_delete_skill(self, user_id, skill_id): if not self.permission_checker.check_permission(user_id, SKILL_DELETE): return False skill = self.get_skill(skill_id) if not skill: return False if not (self.is_skill_owner(user_id, skill) or self.permission_checker.is_admin(user_id)): return False if self.has_dependents(skill_id): return False return True def is_skill_owner(self, user_id, skill): return skill.owner_id == user_id def has_dependents(self, skill_id): pass ``` ## 审计日志 ### 1. 访问日志 ```python class AccessLogger: def __init__(self): self.logs = [] def log_access(self, user_id, resource, action, result): log_entry = { "timestamp": datetime.now(), "user_id": user_id, "resource": resource, "action": action, "result": result } self.logs.append(log_entry) def get_access_logs(self, user_id=None, resource=None, action=None): filtered_logs = self.logs if user_id: filtered_logs = [log for log in filtered_logs if log["user_id"] == user_id] if resource: filtered_logs = [log for log in filtered_logs if log["resource"] == resource] if action: filtered_logs = [log for log in filtered_logs if log["action"] == action] return filtered_logs ``` ### 2. 权限变更日志 ```python class PermissionChangeLogger: def __init__(self): self.changes = [] def log_permission_change(self, user_id, target, old_permissions, new_permissions, changed_by): change_entry = { "timestamp": datetime.now(), "user_id": user_id, "target": target, "old_permissions": old_permissions, "new_permissions": new_permissions, "changed_by": changed_by } self.changes.append(change_entry) def get_permission_changes(self, user_id=None, target=None): filtered_changes = self.changes if user_id: filtered_changes = [change for change in filtered_changes if change["user_id"] == user_id] if target: filtered_changes = [change for change in filtered_changes if change["target"] == target] return filtered_changes ```